package arp.message.kafka;

import java.io.ByteArrayOutputStream;
import java.util.Properties;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.nustaq.serialization.FSTConfiguration;
import org.nustaq.serialization.FSTObjectOutput;

import arp.process.publish.Message;

public class FSTSerializationStrategy implements
		KafkaMessageSerializationStrategy<byte[]> {

	private FSTConfiguration fstConf;

	public FSTSerializationStrategy() {
		fstConf = FSTConfiguration.createJsonConfiguration();
		fstConf.setForceSerializable(true);
	}

	@Override
	public void configValueDeserializerClass(Properties props) {
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
				ByteArraySerializer.class);
	}

	@Override
	public ProducerRecord<String, byte[]> serialize(String processDesc,
			Message message) throws Exception {
		ByteArrayOutputStream bos = new ByteArrayOutputStream();
		FSTObjectOutput oos = fstConf.getObjectOutput(bos);
		oos.writeObject(message);
		oos.flush();
		return new ProducerRecord<>(processDesc, bos.toByteArray());
	}

}
